AWS Glue の Pushdown Predicates を用いてすべてのファイルを読み込むことなく、パーティションをプレフィルタリングする
はじめに
AWS Glueが自動生成するETLコード(PySpark)では、ソースデータをDynamicFrame内部のRDDに読み込み、後続の条件に基づいてデータのフィルタ・変換を繰り返し、最終的にターゲットデータに出力します。一般的なユースケースではすべてのデータが対象で構いませんが、この動作は特定のパーティションのデータのみが対象の場合でも全てのデータの読み込みが発生しまうことを意味します。
例えば、過去1年間のデータを日毎にパーティションしている外部テーブルあるとします。必要なデータがこのテーブルの最新の日付のデータのみであっても、すべてを読み込んだ後、後続の条件に基づいてデータをフィルタすることが必要になります。今回ご紹介するPushdown Predicates引数には、必要なパーティションのデータのみを読み込む指定ができます。不要なデータの読み込みやRDDオブジェクトの生成・破棄のコストを削減できます。つまり、最新の日付のデータのみであった場合、1/365の読み取りで済みます。
Pushdown Predicates とは
AWS Gule の Pushdown Predicates とは、データ(例.S3上のファイル)に対してAWS Glueの各ワーカーが必要なパーティションのデータのみを読み込んでRDDを生成し、後続のフィルタ・変換処理に引渡す、といったプロセスをとります。不要なデータを読まないことでデータの生成・破棄のコストが下がり、結果的にパフォーマンスが向上、、コスト削減が期待できます。
指定方法
push_down_predicate引数には、その述語式を指定します。述語式は、Spark SQLでサポートされている任意のブール式やSpark SQLクエリでWHERE句に入れることができるものはすべて動作します。詳細は、Apache Spark SQL documentation、特にScala SQL functions referenceを参照してください。
(変更前)
datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "default", table_name = "elb_parquet", transformation_ctx = "datasource0")
(変更後)
partition_predicate="year='2015' and month='01' and day='01'" datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "default", table_name = "elb_parquet", push_down_predicate = partition_predicate transformation_ctx = "datasource0")
例.1ヶ月のELBのログから一日のデータをフィルタする
では、実際にフィルタ動作を確認してみましょう。
1ヶ月のELBのログテーブルの作成
圧縮済みファイル(snappy)11 GBのデータファイルに対してテーブル定義します。
$ aws s3 ls s3://athena-examples-us-east-1/elb/parquet/year=2015/month=1/ --recursive --sum --human 2017-02-16 09:42:23 364.8 MiB elb/parquet/year=2015/month=1/day=1/part-r-00156-e764ec48-9e47-4c2c-8d00-68b3a8534cc2.snappy.parquet 2017-02-16 09:42:23 367.6 MiB elb/parquet/year=2015/month=1/day=10/part-r-00104-e764ec48-9e47-4c2c-8d00-68b3a8534cc2.snappy.parquet : : 2017-02-16 09:43:34 359.4 MiB elb/parquet/year=2015/month=1/day=9/part-r-00013-e764ec48-9e47-4c2c-8d00-68b3a8534cc2.snappy.parquet Total Objects: 31 Total Size: 11.0 GiB
CREATE EXTERNAL TABLE `elb_parquet`( `request_timestamp` string, `elb_name` string, `request_ip` string, `request_port` int, `backend_ip` string, `backend_port` int, `request_processing_time` double, `backend_processing_time` double, `client_response_time` double, `elb_response_code` string, `backend_response_code` string, `received_bytes` bigint, `sent_bytes` bigint, `request_verb` string, `url` string, `protocol` string, `user_agent` string, `ssl_cipher` string, `ssl_protocol` string) PARTITIONED BY ( `day` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://athena-examples-ap-northeast-1/elb/parquet/year=2015/month=1/' TBLPROPERTIES ( 'CrawlerSchemaDeserializerVersion'='1.0', 'CrawlerSchemaSerializerVersion'='1.0', 'UPDATED_BY_CRAWLER'='elb_parquet', 'classification'='parquet', 'compressionType'='none', 'typeOfData'='file'); MSCK REPAIR TABLE elb_parquet;
方式1:Filterによるパーティションのフィルタリング
ビルトインの Filter Transformを用いて、1日のデータを取得します。
1ヶ月分の全てのデータを読み込み、DynamicFramedatasource0
を生成した後、Filter Transformで、day
の値が1
のデータのDynamicFramefiltering
を再生成します。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "default", table_name = "elb_parquet", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "elb_parquet", transformation_ctx = "datasource0") print "Full record count: ", datasource0.count() datasource0.printSchema() ## @type: Filter ## @args: [f = lambda x: x["day"] = "1", transformation_ctx = "filtering"] ## @return: filtering ## @inputs: [frame = applymapping1] filtering = Filter.apply(frame = datasource0, f = lambda x: x["day"] == "1", transformation_ctx = "filtering") print "Filtered record count: ", filtering.count() ## @type: ApplyMapping ## @args: [mapping = [("request_timestamp", "string", "request_timestamp", "string"), ("elb_name", "string", "elb_name", "string"), ("request_ip", "string", "request_ip", "string"), ("request_port", "int", "request_port", "int"), ("backend_ip", "string", "backend_ip", "string"), ("backend_port", "int", "backend_port", "int"), ("request_processing_time", "double", "request_processing_time", "double"), ("backend_processing_time", "double", "backend_processing_time", "double"), ("client_response_time", "double", "client_response_time", "double"), ("elb_response_code", "string", "elb_response_code", "string"), ("backend_response_code", "string", "backend_response_code", "string"), ("received_bytes", "long", "received_bytes", "long"), ("sent_bytes", "long", "sent_bytes", "long"), ("request_verb", "string", "request_verb", "string"), ("url", "string", "url", "string"), ("protocol", "string", "protocol", "string"), ("user_agent", "string", "user_agent", "string"), ("ssl_cipher", "string", "ssl_cipher", "string"), ("ssl_protocol", "string", "ssl_protocol", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("request_timestamp", "string", "request_timestamp", "string"), ("elb_name", "string", "elb_name", "string"), ("request_ip", "string", "request_ip", "string"), ("request_port", "int", "request_port", "int"), ("backend_ip", "string", "backend_ip", "string"), ("backend_port", "int", "backend_port", "int"), ("request_processing_time", "double", "request_processing_time", "double"), ("backend_processing_time", "double", "backend_processing_time", "double"), ("client_response_time", "double", "client_response_time", "double"), ("elb_response_code", "string", "elb_response_code", "string"), ("backend_response_code", "string", "backend_response_code", "string"), ("received_bytes", "long", "received_bytes", "long"), ("sent_bytes", "long", "sent_bytes", "long"), ("request_verb", "string", "request_verb", "string"), ("url", "string", "url", "string"), ("protocol", "string", "protocol", "string"), ("user_agent", "string", "user_agent", "string"), ("ssl_cipher", "string", "ssl_cipher", "string"), ("ssl_protocol", "string", "ssl_protocol", "string")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3:///elb_filtered"}, format = "csv", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3:///elb_filtered"}, format = "csv", transformation_ctx = "datasink2") job.commit()
実行時間は、DPU「10」で約28分でした。
方式2:push_down_predicateによるパーティションのプレフィルタリング
DynamicFrameを生成する際にpush_down_predicateに指定したパーティションのみを読み込み、DynamicFrameを生成します。必要なパーティションの読み込みや、DynamicFrameの再生成が不要になります。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "default", table_name = "elb_parquet", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "elb_parquet", push_down_predicate = "day='1'", transformation_ctx = "datasource0") print "Full record count: ", datasource0.count() datasource0.printSchema() ## @type: ApplyMapping ## @args: [mapping = [("request_timestamp", "string", "request_timestamp", "string"), ("elb_name", "string", "elb_name", "string"), ("request_ip", "string", "request_ip", "string"), ("request_port", "int", "request_port", "int"), ("backend_ip", "string", "backend_ip", "string"), ("backend_port", "int", "backend_port", "int"), ("request_processing_time", "double", "request_processing_time", "double"), ("backend_processing_time", "double", "backend_processing_time", "double"), ("client_response_time", "double", "client_response_time", "double"), ("elb_response_code", "string", "elb_response_code", "string"), ("backend_response_code", "string", "backend_response_code", "string"), ("received_bytes", "long", "received_bytes", "long"), ("sent_bytes", "long", "sent_bytes", "long"), ("request_verb", "string", "request_verb", "string"), ("url", "string", "url", "string"), ("protocol", "string", "protocol", "string"), ("user_agent", "string", "user_agent", "string"), ("ssl_cipher", "string", "ssl_cipher", "string"), ("ssl_protocol", "string", "ssl_protocol", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("request_timestamp", "string", "request_timestamp", "string"), ("elb_name", "string", "elb_name", "string"), ("request_ip", "string", "request_ip", "string"), ("request_port", "int", "request_port", "int"), ("backend_ip", "string", "backend_ip", "string"), ("backend_port", "int", "backend_port", "int"), ("request_processing_time", "double", "request_processing_time", "double"), ("backend_processing_time", "double", "backend_processing_time", "double"), ("client_response_time", "double", "client_response_time", "double"), ("elb_response_code", "string", "elb_response_code", "string"), ("backend_response_code", "string", "backend_response_code", "string"), ("received_bytes", "long", "received_bytes", "long"), ("sent_bytes", "long", "sent_bytes", "long"), ("request_verb", "string", "request_verb", "string"), ("url", "string", "url", "string"), ("protocol", "string", "protocol", "string"), ("user_agent", "string", "user_agent", "string"), ("ssl_cipher", "string", "ssl_cipher", "string"), ("ssl_protocol", "string", "ssl_protocol", "string")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3:///elb_prefiltered"}, format = "csv", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3:///elb_prefiltered"}, format = "csv", transformation_ctx = "datasink2") job.commit()
実行時間は、DPU「10」で約2分でした。
考察
方式2は、方式1の14倍速いという結果でした。パフォーマンスの改善の理由は以下の2点と考えられます。
- 方式2は、ソースデータの読み込みが1/31に削減できた
- 方式2は、フィルター適用後のDynamicFrameの再作成が不要
上記の性能改善はあったが、DynamicFrameからCSVファイル出力する時間は変わらないため、31倍ではなく14倍程度になったのではないかと考えられます。
AWS Glueにおけるデータフィルタリングのベストプラクティス
方式1と2を比較しましたが、どちらかが優れているというものではなく、用途に応じて組み合わせることが効果的です。つまり、
- パーティションしているテーブルは、方式2によってパーティションキーでフィルタリングする
- 生成されたDynamicFrameの情報は、方式1によってフィルタリングする
- 上記を方式2、方式1の順に適用する
ということがベストプラクティスです。
最後に
AWS Glueのデータソースは、S3上のデータファイルを直接データソースとするのではなく、データカタログの情報に基づきS3上のデータファイルをスキャンできます。この方式を採用することで不要なパーティションのデータを読み込むことなく、初期のDynamicFrame生成の際にフィルタリングできます。この機能はETLコードのデバックの際に、入力データを絞込む目的で利用することも効果的です。
テーブル設計の際には、パーティショニングを常に心がけておくことで、2つのフィルタリング方式を用途に応じて組み合わせることでパフォーマンス向上、コスト削減が期待できます。テラバイト以上のソースデータを取り扱う場合は、ぜひご検討ください。